package com.stars.easyms.schedule.core;

import com.stars.easyms.schedule.bean.DbScheduleSubTask;
import com.stars.easyms.schedule.bean.DbScheduleTask;
import com.stars.easyms.schedule.constant.DistributedScheduleConstants;
import com.stars.easyms.schedule.service.DistributedScheduleService;
import com.stars.easyms.schedule.enums.*;

import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 任务抓取线程
 *
 * @author guoguifang
 */
class DistributedScheduleCatcher extends AbstractDistributedScheduleThread {

    /**
     * 是否启动延迟结束
     */
    private AtomicBoolean delayFinished = new AtomicBoolean(false);

    /**
     * 查询当前可执行任务的条件
     */
    private Map<String, Object> queryCondition;

    /**
     * 描述：启动服务器后当批量框架首次启动,为了保证项目的配置以及加载项都加载完成,以及可能由于上次服务器的异常关闭使任务状态出现错误的情况，
     * 因此服务器首次启动时延迟启动批量抓取任务
     */
    @Override
    protected void executeBefore() {
        if (delayFinished.compareAndSet(false, true)) {
            long startDelayTimeMilli = getDistributedScheduleServerConfig().getStartDelayTimeMilli();
            logger.info("First start distributed schedule catcher thread, delay start {} milliseconds!", startDelayTimeMilli);
            sleep(startDelayTimeMilli);
        }
    }

    @Override
    protected void execute() {
        DistributedTaskExecutorManager distributedTaskExecutorManager = DistributedTaskExecutorManager.getSingleInstance();

        // 查询执行任务线程队列是否已满,如果已满等待有可用线程时再继续
        int executableCount;
        while (true) {
            executableCount = distributedTaskExecutorManager.getExecutableCountAndTryLockPoolSize();
            if (executableCount > 0) {
                break;
            }
            awaitBlockMillis(60000);
        }

        // 将当前线程状态置为运行状态
        changeThreadStatus(ThreadStatus.RUNNING);

        // 已分配执行数量
        int executedCount = 0;
        try {
            // 获取当前可执行子任务列表
            List<DbScheduleSubTask> executableSubTaskList = DistributedScheduleService.getInstance().getExecutableSubTask(getQueryCondition());
            if (executableSubTaskList != null && executableSubTaskList.size() > 0) {
                HashSet<Long> runStatusTaskIdSet = new HashSet<>();
                for (int i = 0; i < executableCount; i++) {
                    // 得到一个可执行子任务
                    DbScheduleSubTask executableSubTask = getOneExecutableSubTask(executableSubTaskList, runStatusTaskIdSet);

                    // 如果可执行子任务不为空则从线程池获取一个初始化的线程执行该任务
                    if (executableSubTask != null) {
                        // 获取一个任务执行者
                        DistributedTaskExecutor distributedTaskExecutor = distributedTaskExecutorManager.getOneDistributedTaskExecutor();

                        // 得到一个任务执行线程并把抓取的子任务扔给该执行线程
                        distributedTaskExecutor.setExecutableSubTask(executableSubTask);
                        distributedTaskExecutorManager.execute(distributedTaskExecutor);
                        executedCount++;
                    }
                }
            }
        } finally {
            distributedTaskExecutorManager.unlockPoolSize();
        }

        // 当核心线程全部占用完时需线程短暂阻塞，若没有可执行子任务时需线程空闲等待
        if (executedCount == executableCount) {
            awaitBlockMillis(3000);
        } else {
            awaitIdleMillis(DistributedScheduleConstants.DEFAULT_IDLE_TIME_MILLI);
        }
    }

    /**
     * 描述：查询任务表及子任务表按一定规则的顺序得到所有可执行的任务及子任务,遍历修改子任务的状态为'正在执行',如果修改成功则返回该子任务，如果没有修改成功则continue
     * 排序规则：1优先级排序，优先级数值越大优先级越高；2如果优先级相同，优先执行当前子任务执行数量较少的任务；
     *
     * @return DbScheduleSubTask 一条可执行的子任务
     */
    private DbScheduleSubTask getOneExecutableSubTask(List<DbScheduleSubTask> executableSubTaskList, HashSet<Long> runStatusTaskIdSet) {
        if (executableSubTaskList == null || executableSubTaskList.size() == 0) {
            return null;
        }

        // 遍历可执行子任务列表，如果修改状态成功则在列表中去除该子任务并返回该子任务，如果修改状态失败则在列表中去除该子任务并continue直到size为0
        Iterator<DbScheduleSubTask> iterator = executableSubTaskList.iterator();
        while (iterator.hasNext()) {
            DbScheduleSubTask executableSubTask = iterator.next();

            // 如果主任务状态为"阻塞待处理/失败重试-阻塞待处理"，则修改任务状态 '阻塞待处理/失败重试-阻塞待处理' -> '处理中/失败重试-处理中'
            if (!runStatusTaskIdSet.contains(executableSubTask.getParentId())) {
                TaskStatus taskStatus = TaskStatus.forCode(executableSubTask.getTaskStatus());
                if (taskStatus == TaskStatus.BLOCKED || taskStatus == TaskStatus.RETRY_BLOCKED) {
                    // 封装修改参数
                    Map<String, Object> updateMap = new HashMap<>(4);
                    DbScheduleTask updateTask = executableSubTask.createUpdateTask();
                    updateTask.setStatus(taskStatus == TaskStatus.BLOCKED ? TaskStatus.RUNNING.getCode() : TaskStatus.RETRY_RUNNING.getCode());
                    updateMap.put("dto", updateTask);
                    updateMap.put("run", "true");
                    int updateCount = DistributedScheduleService.getInstance().updateDbScheduleTaskByIdAndVersion(updateMap);
                    if (updateCount > 0 && logger.isDebugEnabled()) {
                        logger.debug("Thread[{}] task[{}] status '{}' change for '{}'!", getThreadName(), executableSubTask.getTaskId(), taskStatus, TaskStatus.forCode(updateTask.getStatus()));
                    }
                }
                runStatusTaskIdSet.add(executableSubTask.getParentId());
            }

            // 修改子任务状态 '阻塞待处理' -> '处理中'
            DbScheduleSubTask updateSubTask = executableSubTask.createUpdateSubTask();
            TaskStatus subTaskStatus = TaskStatus.forCode(executableSubTask.getStatus());
            updateSubTask.setStatus(subTaskStatus == TaskStatus.BLOCKED ? TaskStatus.RUNNING.getCode() : TaskStatus.RETRY_RUNNING.getCode());
            updateSubTask.setExecuteServerIp(getDistributedScheduleServerConfig().getIp());
            updateSubTask.setExecuteServerPort(getDistributedScheduleServerConfig().getPort());
            int updateCount = DistributedScheduleService.getInstance().updateDbScheduleSubTaskById(updateSubTask);
            iterator.remove();
            if (updateCount > 0) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Thread[{}] task[{}] status '{}' change for '{}'!", getThreadName(), executableSubTask.getTaskId(), subTaskStatus, TaskStatus.forCode(updateSubTask.getStatus()));
                }
                executableSubTask.setVersion(executableSubTask.getVersion() + 1);
                return executableSubTask;
            }
        }
        return null;
    }

    /**
     * 得到查询条件
     */
    private Map<String, Object> getQueryCondition() {
        if (queryCondition == null) {
            queryCondition = new HashMap<>(8);
            List<String> blockStatusList = new ArrayList<>();
            blockStatusList.add(TaskStatus.BLOCKED.getCode());
            blockStatusList.add(TaskStatus.RETRY_BLOCKED.getCode());
            queryCondition.put("blockStatusList", blockStatusList);
            List<String> runStatusList = new ArrayList<>();
            runStatusList.add(TaskStatus.RUNNING.getCode());
            runStatusList.add(TaskStatus.RETRY_RUNNING.getCode());
            queryCondition.put("runStatusList", runStatusList);
            queryCondition.put("taskSwitchOpen", Switch.OPEN.getCode());
            queryCondition.put("whiteList", WhiteBlackList.WHITE.getCode());
            queryCondition.put("blackList", WhiteBlackList.BLACK.getCode());
            queryCondition.put("yesOrNo_yes", YesOrNo.YES.getCode());
            queryCondition.put("yesOrNo_no", YesOrNo.NO.getCode());
            queryCondition.put("serverIp", getDistributedScheduleServerConfig().getIp());
            queryCondition.put("serverPort", getDistributedScheduleServerConfig().getPort());
        }
        return queryCondition;
    }

    private static DistributedScheduleCatcher singleInstance;

    static DistributedScheduleCatcher getSingleInstance() {
        if (singleInstance == null) {
            synchronized (DistributedScheduleCatcher.class) {
                if (singleInstance == null) {
                    singleInstance = new DistributedScheduleCatcher();
                }
            }
        }
        return singleInstance;
    }

    private DistributedScheduleCatcher() {
        super();
    }
}
